package cn.itcast.flux;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author 虎哥
 */
public class FluxDemo {
    @Test
    public void testFlux() throws InterruptedException {
        // 直接填元素
        // Flux<String> flux = Flux.just("hello", "world", "java");
        // 封装一段行为
        System.out.println("任务开始发布！");
        Flux<String> flux2 = Flux.create(fluxSink -> {
            // 数据发布的行为，例如：写一条sql，查询数据库
            for (int i = 0; i < 5; i++) {
                try {
                    // 模拟延时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 发布数据
                fluxSink.next("data_" + i);
            }
            // 通知，任务结束
            fluxSink.complete();
        });
        System.out.println("任务发布完毕！");
        // 订阅
        flux2.subscribe(System.out::println);

        Thread.sleep(6000);
    }

    @Test
    public void testMono() {
        System.out.println("任务开始发布！");
        Mono<String> mono = Mono.create(monoSink -> {
            // 数据查询
            try {
                Thread.sleep(1000);
                // 发布数据, 只能发布一次
                monoSink.success("hello");
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 失败情况下，发布错误结果
                monoSink.error(e);
            }

        });
        System.out.println("任务发布完毕！");
        mono.subscribe(System.out::println);
    }
}
